package com.boat.kafka.produce.impl;

import java.util.List;
import java.util.Random;

import javax.annotation.Resource;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.boat.kafka.exception.JmsException;
import com.boat.kafka.exception.MQException;
import com.boat.kafka.produce.KafkaMessageProducer;

@Service
public class KafkaMessageProducerImpl implements KafkaMessageProducer {

	@Resource
	private Producer<String, String> producer;

	@Value("${kafka.producer.message-topic}")
	private String messageTopic;

	@Value("${kafka.producer.message-numPartitions}")
	private int messageNumPartitions;

	@Override
	public void sendText(String message, boolean flush) throws JmsException {
		try {
			ProducerRecord<String, String> producerData = new ProducerRecord<String, String>(messageTopic, message);
			producer.send(producerData);
			if (flush) {
				producer.flush();
			}
		} catch (Exception e) {
			throw new MQException(e);
		}
	}

	@Override
	public void sendText(List<String> message, boolean flush) throws JmsException {
		try {
			ProducerRecord<String, String> producerData = null;
			for (String string : message) {
				producerData = new ProducerRecord<String, String>(messageTopic, string);
				producer.send(producerData);
			}
			if (flush) {
				producer.flush();
			}
		} catch (Exception e) {
			throw new MQException(e);
		}
	}

	@Override
	public void sendText(String message, boolean flush, String keyToPartition) throws JmsException {
		try {
			ProducerRecord<String, String> producerData = new ProducerRecord<String, String>(messageTopic, partitionDefine(keyToPartition), null, message);
			producer.send(producerData);
			if (flush) {
				producer.flush();
			}
		} catch (Exception e) {
			throw new MQException(e);
		}
	}

	//相同的key发送到相同的分区（单分区的数据是有序的）
	private int partitionDefine(String keyToPartition) {
		if (keyToPartition == null) {
			return new Random().nextInt(messageNumPartitions);
		} else {
			return Math.abs(keyToPartition.hashCode()) % messageNumPartitions;
		}
	}

}